Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8780][RFC-83] Incremental Table Service #12514

Merged
merged 17 commits into from
Jan 2, 2025
Merged

[HUDI-8780][RFC-83] Incremental Table Service #12514

merged 17 commits into from
Jan 2, 2025

Conversation

zhangyue19921010
Copy link
Contributor

Change Logs

In Hudi, when scheduling Compaction and Clustering, the default behavior is to scan all partitions under the current table. When there are many historical partitions, such as 640,000 in our production environment, this scanning and planning operation becomes very inefficient. For Flink, it often leads to checkpoint timeouts, resulting in data delays.
As for cleaning, we already have the ability to do cleaning for incremental partitions.

This RFC will draw on the design of Incremental Clean to generalize the capability of processing incremental partitions to all table services, such as Clustering and Compaction.

Impact

no

Risk level (write none, low medium or high below)

low

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Dec 18, 2024

### Work Flow for Incremental Table Service

Table Service Planner
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we are using completion time, should we indicate here whether instant refers to request time or completion time?

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Request time I believe. If using completion, it may miss some instant when multi-write.
Also get this getEarliestCommitToRetain can be referenced to CleanPlanner#getEarliestCommitToRetain

Table Service Planner
1. Retrieve the instant recorded in the last completed table service commit as **INSTANT 1**.
2. Calculate the current instant to be processed as **INSTANT 2**.
3. Obtain all partitions involved from **INSTANT 1** to **INSTANT 2** as incremental partitions and perform the table service plan operation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we turn on the incremental table service mode, are the various flexible partition selection mechanisms now unavailable? Consider the following scenario:

  • on ts_0, write to two partitions: p_1 and p_2
  • on ts_1, a schedule compaction compaction with parittion-selection-strategy that only compacts p_2
  • on ts_2, write to p_2 again.
  • on ts_3 compaction will only process partitions written between ts_1 and ts_3. Compaction will still only merge p_2. When can a compaction occur that compacts p_1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For common strategy, this various flexible partition selection mechanisms still works.
For IncrementalxxxxStrategy, this flexible partition selection mechanisms will apply to incremental fetched partitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also in IncrementalxxxxStrategy maybe we can record missing partitions in plan and Process the missing partitions together with the new fetched incremental partitions next time

}
```

`EarliestCommitToReta` in clean commit meta
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retain

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

Add `EarliestCommitToReta` in HoodieCommitMetadata extra meta MAP for clustering and compaction operation which are all written-commit

```text
{"name": "earliestCommitToRetain", "type": "string"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

earliestInstantToRetain or earliestCommitToRetain ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can record this earliestInstantToRetain in clustering/compaction plan request meta file. So no need this change any more.

{"name": "earliestCommitToRetain", "type": "string"}
```

We also need a unified interface/abstract-class to control the Plan behavior and Commit behavior of the TableService.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate why this is needed?

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use PartitionBaseTableServicePlanStrategy to control the behavior of getting partitions, filter partitions and generate table service plan etc.

Because we want to control the logic of partition acquisition, partition filtering, and plan generation through different strategies, in the first step, we need to use an abstraction to converge the logic of partition acquisition, partition filtering, and plan generation into the base strategy.

* @return
* @throws IOException
*/
private List<String> getIncrementalPartitionPaths(Option<HoodieInstant> instantToRetain, TableServiceType type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't each table service executor already distinguish the service type? Maybe type can be eliminated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

return null;
}

public R buildCommitMeta() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need actually. removed.

### Work Flow for Incremental Table Service

Table Service Planner
1. Retrieve the instant recorded in the last completed table service commit as **INSTANT 1**.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to take care of the cases where INSTANT 1 already been archived?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We record EarliestCommitToRetain in the TableService Request metadata file and use it as the basis for retrieving incremental partitions.
Therefore, when Incremental Table Service is enabled, we should always ensure that there is a Clustering/Compaction request metadata in the active timeline.

Also we can use getAllpartitions as Cover-up plan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore, when Incremental Table Service is enabled, we should always ensure that there is a Clustering/Compaction request metadata in the active timeline.

Not really, for cleaning this is true because there could be data quality issues of wrong files are cleaned, but Compaction and Clustering are just rewrites.

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangyue19921010 Thanks for the contribution, let's eliminate the unnecessary refactoring first and focus on the core logic.

I kind of think we should expose the incremental partitions to the specific XXXStrategy class, because the strategy class can decide the partition filtering itself which should be very related.

Let's clarify the behavior for archived table service commits.

@zhangyue19921010
Copy link
Contributor Author

zhangyue19921010 commented Dec 19, 2024

Thanks for your reviewing @danny0405 @yuzhaojing and @TheR1sing3un

I kind of think we should expose the incremental partitions to the specific XXXStrategy class, because the strategy class can decide the partition filtering itself which should be very related.

Totally agree, We need to implement different behaviors such as partition acquisition, partition filtering, and plan construction through different strategies, so that it is more flexible and controllable. But one of the prerequisites for doing so is the need for a unified abstraction of the above API, which is why a PartitionBaseTableServicePlanStrategy is designed.

Let's clarify the behavior for archived table service commits.

Maybe we can do some changes in archive service, such as when Incremental Table Service is enabled, we should always ensure that there is a Clustering/Compaction request metadata in the active timeline Also wen can use getAllpartitions as Cover-up plan


### Abstraction

Use `PartitionBaseTableServicePlanStrategy` to control the behavior of getting partitions, filter partitions and generate table service plan etc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we name it IncrementalPartitionAwareStrategy to emphasize it is "incremental".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

* Returns the earliest commit to retain from instant meta
*/
public Option<HoodieInstant> getEarliestCommitToRetain() {
throw new UnsupportedOperationException("Not support yet");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IncrementalPartitionAwareStrategy should be an user interface IMO, the only API we expose to user is the incremental partitions since last table service. So the logic of following should be removed:

  1. generate plan (should be responsibility of the planner)
  2. getEarliestCommitToRetain (should be responsibility of the planner within the plan executor)

And because the implementaion of compaction and clustering are quite different, maybe we just add two new interfaces: IncrementalPartitionAwareCompactionStrategy and IncrementalPartitionAwareClusteringStrategy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. generate plan and getEarliestCommitToRetain is removed.
  2. As for base abstraction, although the implementation of compaction and clustering are quite different, but for Partition Aware's Compaction and clustering, they both have the same partition processing logic, that is, first obtain the partition and then filter the partition, so maybe we can use one interface for both to control partition related operations. What do u think :)

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, Danny, what's your opinion for the logic of incremental partition acquisition?

Option1 : Record a metadata field in the commit to indicate where the last processing was done. The partition acquisition behavior under Option1 is more flexible.

Option2: Directly obtain the last completed table service commit time as the new starting point. Option2 is simpler and does not require modifying and processing commit metadata fields.

* @return
* @throws IOException
*/
List<String> getIncrementalPartitionPaths(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just add one interface List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths, List<String> incrementalPartitionPaths); to that the strategy can decide which partition are choosed.

The getXXXPartitionPaths should belong to the scope of the executor/planner, let's move them out.

@zhangyue19921010
Copy link
Contributor Author

Hi @danny0405, as we discuss offline. All comments are addressed. PTAL. Thanks for your patience

JIRA: https://issues.apache.org/jira/browse/HUDI-8780

## Abstract
Currently, Table Service, including Clustering and Compaction, need to scans all partitions of the table during the strategy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Currently, Table Service, including Clustering and Compaction, need to scans all partitions of the table during the strategy
Currently, Table Service, including Clustering and Compaction, need to scan all partitions of the table during the strategy

## Design
In the design of Incremental Table service, the following principles are followed:
1. It is unaware of users, that is, it is fully adapted to the partition filtering, Target IO restrictions and other capabilities provided in the existing strategy.
2. All strategies are enabled this incremental processing by default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
2. All strategies are enabled this incremental processing by default
2. All strategies are working under the incremental processing by default

In the design of Incremental Table service, the following principles are followed:
1. It is unaware of users, that is, it is fully adapted to the partition filtering, Target IO restrictions and other capabilities provided in the existing strategy.
2. All strategies are enabled this incremental processing by default
3. Table services to be added later can quickly realize incremental partitioning capabilities
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep good extensibility for new table services.

### Abstraction

### Strategy Interface
Add a new marked strategy interface `IncrementalPartitionAwareStrategy`. Any Strategy implement this `IncrementalPartitionAwareStrategy`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Add a new marked strategy interface `IncrementalPartitionAwareStrategy`. Any Strategy implement this `IncrementalPartitionAwareStrategy`
Add a new marking strategy interface `IncrementalPartitionAwareStrategy`. Any Strategy implement this `IncrementalPartitionAwareStrategy`

that is, first obtain the partition, and then filter the partition. The difference is that clustering obtains partitions in strategy.

Considering that partition acquisition should be a general behavior of the engine, while partition filtering should be a
specific behavior of different strategies, here we perform a small-scale reconstruction of the clustering and compaction plan to achieve:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
specific behavior of different strategies, here we perform a small-scale reconstruction of the clustering and compaction plan to achieve:
specific behavior of different strategies, here we perform a small refactoring to the clustering and compaction plan to achieve:


Considering that partition acquisition should be a general behavior of the engine, while partition filtering should be a
specific behavior of different strategies, here we perform a small-scale reconstruction of the clustering and compaction plan to achieve:
1. Unify partition acquisition in PlanActionExecutor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unified

8. Finally generate the Table Service Plan and record *MISSING PARTITIONS* in it

Note: If the instant time of the last completed TableService cannot be obtained due to reasons such as archive, the partition
acquisition operation will be rollback to obtain all table partitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
acquisition operation will be rollback to obtain all table partitions.
acquisition operation will fall back to obtaining all table partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all changed. PTAL~

@zhangyue19921010 zhangyue19921010 merged commit c43fde5 into master Jan 2, 2025
1 of 2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:M PR with lines of changes in (100, 300]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants